From: Jeroen van der Heijden Date: Wed, 31 Oct 2018 13:36:55 +0000 (+0100) Subject: Added tee test X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~7^2~2^2~22^2~5 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=5fe4cd57aa2dedf7c0b1d9399bc3edfcbd5b8c7c;p=siridb-server.git Added tee test --- diff --git a/itest/run_all.py b/itest/run_all.py index 344c1d9b..822f36ec 100644 --- a/itest/run_all.py +++ b/itest/run_all.py @@ -16,6 +16,7 @@ from test_log import TestLog from test_log import TestLog from test_pipe_support import TestPipeSupport from test_buffer import TestBuffer +from test_tee import TestTee if __name__ == '__main__': @@ -32,3 +33,4 @@ if __name__ == '__main__': run_test(TestLog()) run_test(TestPipeSupport()) run_test(TestBuffer()) + run_test(TestTee()) diff --git a/itest/test_tee.py b/itest/test_tee.py new file mode 100644 index 00000000..fb07c306 --- /dev/null +++ b/itest/test_tee.py @@ -0,0 +1,97 @@ +import os +import asyncio +import functools +import random +import time +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import SiriDB +from testing import TestBase +from testing import SiriDBAsyncUnixServer +from testing import parse_args + + +PIPE_NAME = '/tmp/dbtest_tee.sock' + +DATA = { + 'series num_float': [ + [1471254705, 1.5], + [1471254707, -3.5], + [1471254710, -7.3]], + 'series num_integer': [ + [1471254705, 5], + [1471254708, -3], + [1471254710, -7]], + 'series_log': [ + [1471254710, 'log line one'], + [1471254712, 'log line two'], + [1471254714, 'another line (three)'], + [1471254716, 'and yet one more']] +} + +if os.path.exists(PIPE_NAME): + os.unlink(PIPE_NAME) + + +class TestTee(TestBase): + title = 'Test tee' + + def on_data(self, data): + for k, v in data.items(): + if k not in self._tee_data: + self._tee_data[k] = [] + self._tee_data[k].extend(v) + + + @default_test_setup(1, pipe_name=PIPE_NAME) + async def run(self): + self._tee_data = {} + + server = SiriDBAsyncUnixServer(PIPE_NAME, self.on_data) + + await server.create() + + await self.client0.connect() + + await self.client0.query( + 'alter servers set tee_pipe_name "{}"'.format(PIPE_NAME)) + + await asyncio.sleep(1) + + self.assertEqual( + await self.client0.insert(DATA), + {'success_msg': 'Successfully inserted 10 point(s).'}) + + self.assertAlmostEqual( + await self.client0.query('select * from "series num_float"'), + {'series num_float': DATA['series num_float']}) + + self.assertEqual( + await self.client0.query('select * from "series num_integer"'), + {'series num_integer': DATA['series num_integer']}) + + self.assertEqual( + await self.client0.query('select * from "series_log"'), + {'series_log': DATA['series_log']}) + + await asyncio.sleep(1) + + self.assertEqual(DATA, self._tee_data) + + self.client0.close() + + return False + + +if __name__ == '__main__': + parse_args() + run_test(TestTee()) diff --git a/itest/testing/__init__.py b/itest/testing/__init__.py index 9ce56c32..3f6cb861 100644 --- a/itest/testing/__init__.py +++ b/itest/testing/__init__.py @@ -16,6 +16,7 @@ from .testbase import default_test_setup from .testbase import TestBase from .series import Series from .pipe_client import PipeClient as SiriDBAsyncUnixConnection +from .pipe_server import PipeServer as SiriDBAsyncUnixServer from .args import parse_args from .task import Task diff --git a/itest/testing/pipe_server.py b/itest/testing/pipe_server.py new file mode 100644 index 00000000..3d0f2a08 --- /dev/null +++ b/itest/testing/pipe_server.py @@ -0,0 +1,83 @@ + +import logging +import asyncio +import struct +import qpack +from siridb.connector import SiriDBProtocol +from siridb.connector.lib.connection import SiriDBAsyncConnection + + +class Package: + + __slots__ = ('pid', 'length', 'tipe', 'checkbit', 'data') + + struct_datapackage = struct.Struct('tee->pipe_name_ = strndup( (char *) qp_obj.via.raw, qp_obj.len); - READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.") + + if (!(*siridb)->tee->pipe_name_) + { + READ_DB_EXIT_WITH_ERROR("Cannot allocate tee pipe name.") + } } else if (qp_obj.tp != QP_NULL) { diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index 920d0b39..e8f1eebe 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -1060,6 +1060,11 @@ static int INSERT_init_local( siridb_tasks_inc(siridb->tasks); siridb->insert_tasks++; + if (siridb_tee_is_connected(siridb->tee)) + { + siridb_tee_write(siridb->tee, promise); + } + uv_async_init(siri.loop, handle, INSERT_local_task); uv_async_send(handle); diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index bc3ead7c..abccf508 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -4052,7 +4052,12 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) (cexpr_cb_t) siridb_server_cexpr_cb, &wserver)) { - siridb_tee_set_pipe_name(siridb->tee, pipe_name); + (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + if (siridb_save(siridb)) + { + log_critical("Could not save database changes (database: '%s')", + siridb->dbname); + } q_alter->n++; } @@ -4095,6 +4100,11 @@ static void exit_set_tee_pipe_name(uv_async_t * handle) if (server == siridb->server) { (void) siridb_tee_set_pipe_name(siridb->tee, pipe_name); + if (siridb_save(siridb)) + { + log_critical("Could not save database changes (database: '%s')", + siridb->dbname); + } SIRIPARSER_ASYNC_NEXT_NODE } diff --git a/src/siri/db/tee.c b/src/siri/db/tee.c index 53070f2d..32787e90 100644 --- a/src/siri/db/tee.c +++ b/src/siri/db/tee.c @@ -162,25 +162,26 @@ static void tee__on_connect(uv_connect_t * req, int status) "Cannot open pipe '%s' for reading", tee->pipe_name_) >= 0) { - log_error(tee->err_msg_); + log_warning(tee->err_msg_); } + goto fail; } - else - { - tee->flags |= SIRIDB_TEE_FLAG_CONNECTED; - } + tee->flags |= SIRIDB_TEE_FLAG_CONNECTED; + goto done; } - else + + if (asprintf( + &tee->err_msg_, + "Cannot connect to pipe '%s' (%s)", + tee->pipe_name_, + uv_strerror(status)) >= 0) { - if (asprintf( - &tee->err_msg_, - "Cannot connect to pipe '%s' (%s)", - tee->pipe_name_, - uv_strerror(status)) >= 0) - { - log_error(tee->err_msg_); - } + log_warning(tee->err_msg_); } + +fail: + uv_close((uv_handle_t *) req->handle, NULL); +done: free(req); } @@ -210,8 +211,8 @@ static void tee__on_data( sirinet_pipe_name((uv_pipe_t * ) client), uv_err_name(nread)); } - log_info("Disconnected from tee pipe: '%s'", - sirinet_pipe_name((uv_pipe_t * ) client)); + log_info("Disconnected from tee"); + tee->flags &= ~SIRIDB_TEE_FLAG_INIT; tee->flags &= ~SIRIDB_TEE_FLAG_CONNECTED; uv_close((uv_handle_t *) client, NULL); } diff --git a/src/siri/service/client.c b/src/siri/service/client.c index 127346cd..941efb42 100644 --- a/src/siri/service/client.c +++ b/src/siri/service/client.c @@ -618,9 +618,20 @@ static void CLIENT_on_file_database( siri_service_client_t * adm_client, sirinet_pkg_t * pkg) { - FILE * fp; + qp_fpacker_t * fpacker; qp_unpacker_t unpacker; - qp_obj_t qp_uuid; + qp_obj_t + qp_uuid, + qp_schema, + qp_dbname, + qp_time_precision, + qp_buffer_size, + qp_duration_num, + qp_duration_log, + qp_timezone, + qp_drop_threshold, + qp_points_limit, + qp_list_limit; siridb_t * siridb; int rc; /* 13 = strlen("database.dat")+1 */ @@ -629,31 +640,55 @@ static void CLIENT_on_file_database( qp_unpacker_init(&unpacker, pkg->data, pkg->len); - if (qp_is_array(qp_next(&unpacker, NULL)) && - /* schema check is not required at this moment but can be done here */ - qp_next(&unpacker, NULL) == QP_INT64 && - qp_next(&unpacker, &qp_uuid) == QP_RAW && - qp_uuid.len == 16) - { - memcpy(unpacker.pt - 16, &adm_client->uuid, 16); - } - else + if (!qp_is_array(qp_next(&unpacker, NULL)) || + qp_next(&unpacker, &qp_schema) != QP_INT64 || + qp_next(&unpacker, &qp_uuid) != QP_RAW || qp_uuid.len != 16 || + qp_next(&unpacker, &qp_dbname) != QP_RAW || + qp_next(&unpacker, &qp_time_precision) != QP_INT64 || + qp_next(&unpacker, &qp_buffer_size) != QP_INT64 || + qp_next(&unpacker, &qp_duration_num) != QP_INT64 || + qp_next(&unpacker, &qp_duration_log) != QP_INT64 || + qp_next(&unpacker, &qp_timezone) != QP_RAW || + qp_next(&unpacker, &qp_drop_threshold) != QP_DOUBLE) { CLIENT_err(adm_client, "invalid database file received"); return; } - fp = fopen(fn, "w"); + /* list and points limit require at least schema 1 */ + (void) qp_next(&unpacker, &qp_points_limit); + (void) qp_next(&unpacker, &qp_list_limit); - if (fp == NULL) + /* this is the tee pipe name when schema is >= 5 */ + (void) qp_next(&unpacker, NULL); + + if ((fpacker = qp_open(fn, "w")) == NULL) { CLIENT_err(adm_client, "cannot write or create file: %s", fn); return; } - rc = fwrite(pkg->data, pkg->len, 1, fp); - - if (fclose(fp) || rc != 1) + rc = ( qp_fadd_type(fpacker, QP_ARRAY_OPEN) || + qp_fadd_int64(fpacker, SIRIDB_SCHEMA) || + qp_fadd_raw(fpacker, (const unsigned char *) adm_client->uuid, 16) || + qp_fadd_raw(fpacker, qp_dbname.via.raw, qp_dbname.len) || + qp_fadd_int64(fpacker, qp_time_precision.via.int64) || + qp_fadd_int64(fpacker, qp_buffer_size.via.int64) || + qp_fadd_int64(fpacker, qp_duration_num.via.int64) || + qp_fadd_int64(fpacker, qp_duration_log.via.int64) || + qp_fadd_raw(fpacker, qp_timezone.via.raw, qp_timezone.len) || + qp_fadd_double(fpacker, qp_drop_threshold.via.real) || + qp_fadd_int64(fpacker, qp_points_limit.tp == QP_INT64 + ? qp_points_limit.via.int64 + : DEF_SELECT_POINTS_LIMIT) || + qp_fadd_int64(fpacker, qp_list_limit.tp == QP_INT64 + ? qp_list_limit.via.int64 + : DEF_LIST_LIMIT) || + qp_fadd_type(fpacker, QP_NULL) || + qp_fadd_type(fpacker, QP_ARRAY_CLOSE) || + qp_close(fpacker)); + + if (rc != 0) { CLIENT_err(adm_client, "cannot write or create file: %s", fn); return; diff --git a/src/siri/service/request.c b/src/siri/service/request.c index 1207f353..c60ab7a6 100644 --- a/src/siri/service/request.c +++ b/src/siri/service/request.c @@ -622,6 +622,7 @@ static cproto_server_t SERVICE_on_new_database( qp_fadd_double(fp, DEF_DROP_THRESHOLD) || qp_fadd_int64(fp, DEF_SELECT_POINTS_LIMIT) || qp_fadd_int64(fp, DEF_LIST_LIMIT) || + qp_fadd_type(fp, QP_NULL) || qp_fadd_type(fp, QP_ARRAY_CLOSE)) { rc = -1; diff --git a/src/xstr/xstr.c b/src/xstr/xstr.c index 358c4252..ee3dcd32 100644 --- a/src/xstr/xstr.c +++ b/src/xstr/xstr.c @@ -1,6 +1,7 @@ /* * xstr.c - Extra String functions used by SiriDB. */ +#include #include #include #include @@ -275,42 +276,48 @@ size_t xstr_extract_string(char * dest, const char * source, size_t len) double xstr_to_double(const char * src, size_t len) { char * pt = (char *) src; + assert (len); double d = 0; double convert; + uint64_t r1 = 0; switch (*pt) { case '-': + assert (len > 1); convert = -1.0; - pt++; + ++pt; + --len; break; case '+': - pt++; - /* FALLTHRU */ - /* no break */ + assert (len > 1); + convert = 1.0; + ++pt; + --len; + break; default: convert = 1.0; } - uint64_t r1 = *pt - '0'; - - while (--len && isdigit(*(++pt))) + for (; len && isdigit(*pt); --len, ++pt) { r1 = 10 * r1 + *pt - '0'; } d = (double) r1; - if (--len && *(pt++) == '.') + if (len && --len) { - uint64_t r2 = *pt - '0'; - ssize_t i; - for (i = -1; --len && isdigit(*(++pt)); i--) + uint64_t r2; + double power; + ++pt; + r2 = *pt - '0'; + for (power = -1.0f; --len && isdigit(*(++pt)); power--) { - r2 = 10 * r2 + *pt - '0'; + r2 = 10 * r2 + *pt - '0'; } - d += pow(10, i) * (double) r2; + d += pow(10.0f, power) * (double) r2; } return convert * d; diff --git a/test/test_grammar/test_grammar.c b/test/test_grammar/test_grammar.c index e5190df7..9f052bbc 100644 --- a/test/test_grammar/test_grammar.c +++ b/test/test_grammar/test_grammar.c @@ -18,7 +18,7 @@ int main() { test_start("grammar"); - cleri_grammar_t * grammar = compile_grammar(); + cleri_grammar_t * grammar = compile_siri_grammar_grammar(); assert_invalid(grammar, "select * from"); assert_invalid(grammar, "list"); diff --git a/test/test_lookup/sources b/test/test_lookup/sources index eee57c6b..a167b287 100644 --- a/test/test_lookup/sources +++ b/test/test_lookup/sources @@ -1,3 +1,3 @@ ../src/siri/db/lookup.c ../src/siri/err.c -../src/logger/logger.c \ No newline at end of file +../src/logger/logger.c diff --git a/test/test_siridb/sources b/test/test_siridb/sources index e6b944a3..100f4f76 100644 --- a/test/test_siridb/sources +++ b/test/test_siridb/sources @@ -66,6 +66,7 @@ ../src/siri/db/shard.c ../src/siri/db/shards.c ../src/siri/db/tasks.c +../src/siri/db/tee.c ../src/siri/db/time.c ../src/siri/db/user.c ../src/siri/db/users.c @@ -78,4 +79,4 @@ ../src/siri/service/request.c ../src/siri/help/help.c ../src/siri/cfg/cfg.c -../src/siri/grammar/grammar.c \ No newline at end of file +../src/siri/grammar/grammar.c diff --git a/test/test_xstr/test_xstr.c b/test/test_xstr/test_xstr.c index 6e883437..0cf3a135 100644 --- a/test/test_xstr/test_xstr.c +++ b/test/test_xstr/test_xstr.c @@ -12,9 +12,20 @@ int main() _assert (xstr_to_double("0.55", 3) == 0.5); _assert (xstr_to_double("123.456", 7) == 123.456); _assert (xstr_to_double("123", 3) == 123); + _assert (xstr_to_double("1234", 3) == 123); _assert (xstr_to_double("123.", 4) == 123); + _assert (xstr_to_double("123.", 3) == 123); + _assert (xstr_to_double("+1234", 4) == 123); + _assert (xstr_to_double("-1234", 4) == -123); _assert (xstr_to_double("123456.", 3) == 123); - _assert (xstr_to_double("-0.5", 3) == -0.5); + _assert (xstr_to_double("-0.5", 4) == -0.5); + _assert (xstr_to_double("-0.56", 4) == -0.5); + _assert (xstr_to_double("+0.5", 4) == 0.5); + _assert (xstr_to_double("+0.56", 4) == 0.5); + _assert (xstr_to_double("-.5", 3) == -0.5); + _assert (xstr_to_double("+.55", 3) == 0.5); + _assert (xstr_to_double(".55", 2) == 0.5); + _assert (xstr_to_double("-.55", 3) == -0.5); } return test_end();